From 7c8ea19b99146a2a57abe792ef30674340dd34a0 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 11 Jul 2014 08:57:47 -0700 Subject: [PATCH] Refactor the parallel job queue to its own module --- src/cargo/ops/cargo_rustc/job_queue.rs | 109 +++++++++++++++++++++++++ src/cargo/ops/cargo_rustc/mod.rs | 89 ++------------------ 2 files changed, 114 insertions(+), 84 deletions(-) create mode 100644 src/cargo/ops/cargo_rustc/job_queue.rs diff --git a/src/cargo/ops/cargo_rustc/job_queue.rs b/src/cargo/ops/cargo_rustc/job_queue.rs new file mode 100644 index 000000000..99a21c17b --- /dev/null +++ b/src/cargo/ops/cargo_rustc/job_queue.rs @@ -0,0 +1,109 @@ +use std::collections::HashMap; +use term::color::YELLOW; + +use core::Package; +use util::{Config, TaskPool, DependencyQueue, Fresh, Dirty, Freshness}; +use util::CargoResult; + +use super::job::Job; + +pub struct JobQueue<'a, 'b> { + pool: TaskPool, + queue: DependencyQueue<(&'a Package, Job)>, + tx: Sender, + rx: Receiver, + active: HashMap, + config: &'b mut Config<'b>, +} + +type Message = (String, Freshness, CargoResult>); + +impl<'a, 'b> JobQueue<'a, 'b> { + pub fn new(config: &'b mut Config<'b>, + jobs: Vec<(&'a Package, Freshness, Job)>) -> JobQueue<'a, 'b> { + let (tx, rx) = channel(); + let mut queue = DependencyQueue::new(); + for &(pkg, _, _) in jobs.iter() { + queue.register(pkg); + } + for (pkg, fresh, job) in jobs.move_iter() { + queue.enqueue(pkg, fresh, (pkg, job)); + } + + JobQueue { + pool: TaskPool::new(config.jobs()), + queue: queue, + tx: tx, + rx: rx, + active: HashMap::new(), + config: config, + } + } + + /// Execute all jobs necessary to build the dependency graph. + /// + /// This function will spawn off `config.jobs()` workers to build all of the + /// necessary dependencies, in order. Freshness is propagated as far as + /// possible along each dependency chain. + pub fn execute(&mut self) -> CargoResult<()> { + // Iteratively execute the dependency graph. Each turn of this loop will + // schedule as much work as possible and then wait for one job to finish, + // possibly scheduling more work afterwards. + while self.queue.len() > 0 { + loop { + match self.queue.dequeue() { + Some((name, Fresh, (pkg, _))) => { + assert!(self.active.insert(name.clone(), 1u)); + try!(self.config.shell().status("Fresh", pkg)); + self.tx.send((name, Fresh, Ok(Vec::new()))); + } + Some((name, Dirty, (pkg, job))) => { + assert!(self.active.insert(name.clone(), 1)); + try!(self.config.shell().status("Compiling", pkg)); + let my_tx = self.tx.clone(); + self.pool.execute(proc() my_tx.send((name, Dirty, job.run()))); + } + None => break, + } + } + + // Now that all possible work has been scheduled, wait for a piece + // of work to finish. If any package fails to build then we stop + // scheduling work as quickly as possibly. + let (name, fresh, result) = self.rx.recv(); + *self.active.get_mut(&name) -= 1; + match result { + Ok(v) => { + for job in v.move_iter() { + *self.active.get_mut(&name) += 1; + let my_tx = self.tx.clone(); + let my_name = name.clone(); + self.pool.execute(proc() { + my_tx.send((my_name, fresh, job.run())); + }); + } + if *self.active.get(&name) == 0 { + self.active.remove(&name); + self.queue.finish(&name, fresh); + } + } + Err(e) => { + if *self.active.get(&name) == 0 { + self.active.remove(&name); + } + if self.active.len() > 0 && self.config.jobs() > 1 { + try!(self.config.shell().say( + "Build failed, waiting for other \ + jobs to finish...", YELLOW)); + for _ in self.rx.iter() {} + } + return Err(e) + } + } + } + + log!(5, "rustc jobs completed"); + + Ok(()) + } +} diff --git a/src/cargo/ops/cargo_rustc/mod.rs b/src/cargo/ops/cargo_rustc/mod.rs index 9d28b09ed..067acd0dd 100644 --- a/src/cargo/ops/cargo_rustc/mod.rs +++ b/src/cargo/ops/cargo_rustc/mod.rs @@ -1,17 +1,16 @@ -use std::collections::HashMap; -use term::color::YELLOW; - use core::{Package, PackageSet, Target, Resolve}; use util; use util::{CargoResult, ProcessBuilder, CargoError, human}; -use util::{Config, TaskPool, DependencyQueue, Fresh, Dirty, Freshness}; +use util::{Config, Freshness}; use self::job::Job; +use self::job_queue::JobQueue; use self::context::Context; -mod job; mod context; mod fingerprint; +mod job; +mod job_queue; type Args = Vec; @@ -77,7 +76,7 @@ pub fn compile_targets<'a>(env: &str, targets: &[&Target], pkg: &Package, try!(compile(targets, pkg, &mut cx, &mut jobs)); // Now that we've figured out everything that we're going to do, do it! - execute(cx.config, jobs) + JobQueue::new(cx.config, jobs).execute() } fn compile<'a>(targets: &[&Target], pkg: &'a Package, cx: &mut Context, @@ -276,81 +275,3 @@ fn build_deps_args(dst: &mut Args, package: &Package, cx: &Context) { cx.target_filename(target))); } } - -/// Execute all jobs necessary to build the dependency graph. -/// -/// This function will spawn off `config.jobs()` workers to build all of the -/// necessary dependencies, in order. Freshness is propagated as far as possible -/// along each dependency chain. -fn execute(config: &mut Config, - jobs: Vec<(&Package, Freshness, Job)>) -> CargoResult<()> { - let pool = TaskPool::new(config.jobs()); - let (tx, rx) = channel(); - let mut queue = DependencyQueue::new(); - for &(pkg, _, _) in jobs.iter() { - queue.register(pkg); - } - for (pkg, fresh, job) in jobs.move_iter() { - queue.enqueue(pkg, fresh, (pkg, job)); - } - - // Iteratively execute the dependency graph. Each turn of this loop will - // schedule as much work as possible and then wait for one job to finish, - // possibly scheduling more work afterwards. - let mut active = HashMap::new(); - while queue.len() > 0 { - loop { - match queue.dequeue() { - Some((name, Fresh, (pkg, _))) => { - assert!(active.insert(name.clone(), 1u)); - try!(config.shell().status("Fresh", pkg)); - tx.send((name, Fresh, Ok(Vec::new()))); - } - Some((name, Dirty, (pkg, job))) => { - assert!(active.insert(name.clone(), 1)); - try!(config.shell().status("Compiling", pkg)); - let my_tx = tx.clone(); - pool.execute(proc() my_tx.send((name, Dirty, job.run()))); - } - None => break, - } - } - - // Now that all possible work has been scheduled, wait for a piece of - // work to finish. If any package fails to build then we stop scheduling - // work as quickly as possibly. - let (name, fresh, result) = rx.recv(); - *active.get_mut(&name) -= 1; - match result { - Ok(v) => { - for job in v.move_iter() { - *active.get_mut(&name) += 1; - let my_tx = tx.clone(); - let my_name = name.clone(); - pool.execute(proc() { - my_tx.send((my_name, fresh, job.run())); - }); - } - if *active.get(&name) == 0 { - active.remove(&name); - queue.finish(&name, fresh); - } - } - Err(e) => { - if *active.get(&name) == 0 { - active.remove(&name); - } - if active.len() > 0 && config.jobs() > 1 { - try!(config.shell().say("Build failed, waiting for other \ - jobs to finish...", YELLOW)); - for _ in rx.iter() {} - } - return Err(e) - } - } - } - - log!(5, "rustc jobs completed"); - - Ok(()) -} -- 2.30.2